gRPC with Python
The Surprising Part: Your REST API Is Slow Between Services
# REST call between internal services (what most teams do)
import httpx
import json
async def classify_document_rest(text: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://classification-service/classify",
json={"text": text}, # JSON serialisation: ~0.5ms
headers={"Content-Type": "application/json"},
)
return response.json() # JSON deserialisation: ~0.3ms
# What this costs per call:
# - TCP handshake (if no keep-alive): 1-3 RTT
# - TLS handshake: 1-2 RTT
# - HTTP/1.1 header overhead: 200-800 bytes of ASCII headers
# - JSON serialisation + deserialisation: ~0.8ms for 1KB payload
# - HTTP/1.1 head-of-line blocking: can't pipeline without HTTP/2
# At 10,000 classification calls per minute, JSON overhead alone = 8 seconds of CPU time
# gRPC call - same operation
import grpc
from classification_pb2_grpc import ClassifierStub
from classification_pb2 import ClassifyRequest
async def classify_document_grpc(text: str, stub: ClassifierStub) -> dict:
request = ClassifyRequest(text=text)
response = await stub.Classify(request) # Protobuf: ~0.05ms to serialise
return {"label": response.label, "confidence": response.confidence}
# What this costs per call:
# - Reuses existing HTTP/2 multiplexed connection (no handshake)
# - Protobuf binary: 10-100x smaller than equivalent JSON for structured data
# - HTTP/2 multiplexing: no head-of-line blocking
# - Generated code: no runtime field name parsing
# - At 10,000 calls per minute: ~500ms CPU time for serialisation
The difference is not always significant. For a service called 10 times per minute, REST is fine. For a service called 10,000 times per minute by multiple callers, gRPC is worth the setup cost. This lesson will show you exactly when, and how.
What You Will Learn
- Why gRPC - REST vs gRPC comparison, when each wins
- Protocol Buffers -
.protofile anatomy, types, generating Python code - Unary RPC - complete client and server implementation
- Server-side streaming - streaming batch results back
- Client-side streaming - streaming uploads
- Bidirectional streaming - real-time processing
- Interceptors - logging, auth, retry at the gRPC layer
- Error Handling -
StatusCode, structured errors, HTTP mapping - Decision Guide - concrete criteria for gRPC vs REST
Prerequisites: Python async/await, FastAPI basics (Lesson 01), Docker.
Part 1: REST vs gRPC - An Honest Comparison
The Comparison Table
| Feature | REST / JSON | gRPC / Protobuf |
|---|---|---|
| Protocol | HTTP/1.1 or HTTP/2 | HTTP/2 (required) |
| Payload format | JSON (text) | Protocol Buffers (binary) |
| Payload size | Larger (field names repeated every message) | 3–10x smaller |
| Serialisation speed | ~1 ms for 1 KB | ~0.05 ms for equivalent |
| Schema enforcement | Optional (OpenAPI, manual) | Required (.proto files) |
| Code generation | Optional (openapi-generator) | Required (protoc) |
| Browser support | Native | Requires grpc-web proxy |
| Streaming | SSE, WebSocket (separate protocols) | Native (4 patterns) |
| Load balancing | L7 (any reverse proxy) | L7 with HTTP/2 awareness required |
| Debugging | curl, browser, Postman | grpcurl, grpc-client-cli |
| Learning curve | Low | Medium-high |
| When to choose | Public APIs, browser clients, simple internal | High-frequency internal, streaming, polyglot |
The Honest Benchmark Thought Experiment
Consider classifying 1 million documents per day:
REST / JSON:
- 1,000,000 calls × 0.8ms serialisation = 800 seconds of CPU
- Payload: 1 KB JSON per request = 1 GB of data transferred (internal network)
- Connection overhead: if not reusing keep-alive, adds 1-3ms per call
gRPC / Protobuf:
- 1,000,000 calls × 0.05ms serialisation = 50 seconds of CPU (16x faster)
- Payload: ~100 bytes protobuf per request = 100 MB of data transferred
- Connection: HTTP/2 multiplexes all calls over persistent connections
For this workload, gRPC saves ~750 CPU-seconds per day and 900 MB of internal network traffic. For a smaller workload (1,000 calls/day), the difference is negligible.
Part 2: Protocol Buffers
The .proto File
// protos/classification.proto
syntax = "proto3";
package classification;
// Go package option (used when generating Go code from this proto)
option go_package = "github.com/example/doc-intelligence/classification";
// Python package option (used when generating Python code)
option python_package = "classification_pb2";
// ─── Scalar Types ───────────────────────────────────────────────────
// double, float, int32, int64, uint32, uint64, sint32, sint64,
// fixed32, fixed64, sfixed32, sfixed64, bool, string, bytes
// ─── Enum ──────────────────────────────────────────────────────────
enum DocumentType {
DOCUMENT_TYPE_UNSPECIFIED = 0; // Always have a default 0 value
DOCUMENT_TYPE_INVOICE = 1;
DOCUMENT_TYPE_CONTRACT = 2;
DOCUMENT_TYPE_REPORT = 3;
DOCUMENT_TYPE_RECEIPT = 4;
DOCUMENT_TYPE_LETTER = 5;
}
// ─── Messages ──────────────────────────────────────────────────────
// Request message - what the client sends
message ClassifyRequest {
string text = 1; // Field number 1 - unique, never reuse
string document_id = 2; // For logging and idempotency
string model_version = 3; // Optionally request a specific model
int32 max_results = 4; // How many labels to return (top-N)
}
// Response message - what the server returns
message ClassifyResponse {
string document_id = 1;
repeated ClassificationResult results = 2; // 'repeated' = list/array
string model_version = 3;
int64 processing_time_ms = 4;
}
// Nested message
message ClassificationResult {
DocumentType document_type = 1;
float confidence = 2; // 0.0 to 1.0
repeated string keywords = 3; // repeated nested in repeated is fine
}
// Batch request - uses 'repeated' for input too
message ClassifyBatchRequest {
repeated ClassifyRequest requests = 1;
bool include_keywords = 2;
}
// For streaming - streamed one at a time
message ClassifyBatchResponse {
ClassifyResponse result = 1;
int32 sequence_number = 2; // Position in batch
bool is_last = 3;
}
// For upload streaming
message DocumentChunk {
bytes data = 1; // Chunk of binary data
string document_id = 2; // Stable across all chunks
int32 chunk_number = 3;
bool is_last = 4;
string mime_type = 5; // Only sent in first chunk
}
message UploadResponse {
string document_id = 1;
int64 total_bytes = 2;
int32 total_chunks = 3;
string storage_key = 4;
}
// oneof - exactly one of these fields can be set
message ClassificationEvent {
oneof event_type {
ClassifyRequest classification_started = 1;
ClassifyResponse classification_complete = 2;
ErrorEvent classification_failed = 3;
}
string correlation_id = 4;
}
// map<key_type, value_type> - dictionary
message ModelMetadata {
string model_name = 1;
string version = 2;
map<string, string> labels = 3; // label_id -> label_name
map<string, float> thresholds = 4; // label_id -> confidence threshold
}
message ErrorEvent {
string code = 1;
string message = 2;
map<string, string> metadata = 3;
}
// ─── Service Definition ─────────────────────────────────────────────
service DocumentClassifier {
// Unary: client sends ONE request, server returns ONE response
rpc Classify(ClassifyRequest) returns (ClassifyResponse);
// Server-side streaming: client sends ONE request, server streams MANY responses
rpc ClassifyBatch(ClassifyBatchRequest) returns (stream ClassifyBatchResponse);
// Client-side streaming: client streams MANY requests, server returns ONE response
rpc UploadDocument(stream DocumentChunk) returns (UploadResponse);
// Bidirectional streaming: both sides stream simultaneously
rpc ClassifyStream(stream ClassificationEvent) returns (stream ClassificationEvent);
}
Generating Python Code
# Install dependencies
pip install grpcio grpcio-tools
# Generate Python code from the proto file
# This creates two files:
# classification_pb2.py - message classes
# classification_pb2_grpc.py - service stubs and servicers
python -m grpc_tools.protoc \
-I./protos \
--python_out=./classification_service \
--grpc_python_out=./classification_service \
./protos/classification.proto
# If you have multiple proto files that import each other:
python -m grpc_tools.protoc \
-I./protos \
--python_out=./generated \
--grpc_python_out=./generated \
./protos/classification.proto \
./protos/common.proto
# For better type hints (optional but recommended):
pip install grpc-stubs mypy-protobuf
python -m grpc_tools.protoc \
-I./protos \
--python_out=./generated \
--grpc_python_out=./generated \
--mypy_out=./generated \
./protos/classification.proto
Understanding the Generated Python Classes
# What protoc generates - simplified view
# classification_pb2.py (generated, do not edit)
# Message classes are generated as Python classes
# Field access uses attribute syntax
request = ClassifyRequest(
text="This invoice is due on January 15th...",
document_id="doc-001",
max_results=3,
)
print(request.text) # "This invoice is due on January 15th..."
print(request.document_id) # "doc-001"
print(request.max_results) # 3
# Serialisation
serialised = request.SerializeToString() # bytes
print(len(serialised)) # Much smaller than JSON equivalent
# Deserialisation
recovered = ClassifyRequest()
recovered.ParseFromString(serialised)
# Working with repeated fields (lists)
response = ClassifyResponse()
response.results.append(
ClassificationResult(
document_type=DocumentType.DOCUMENT_TYPE_INVOICE,
confidence=0.97,
)
)
response.results[0].keywords.extend(["invoice", "payment", "due date"])
# Working with oneof
event = ClassificationEvent(
correlation_id="corr-xyz",
classification_started=ClassifyRequest(text="test"),
)
# Check which field is set
print(event.WhichOneof("event_type")) # "classification_started"
# Working with map
metadata = ModelMetadata()
metadata.labels["INV"] = "Invoice"
metadata.labels["CON"] = "Contract"
metadata.thresholds["INV"] = 0.85
print(dict(metadata.labels)) # {"INV": "Invoice", "CON": "Contract"}
Part 3: Unary RPC - Complete Server and Client
The gRPC Server
# classification_service/server.py
import asyncio
import logging
import grpc
from grpc import aio
import classification_pb2 as pb2
import classification_pb2_grpc as pb2_grpc
from classification_service.model import DocumentClassifierModel
from classification_service.interceptors import LoggingInterceptor, AuthInterceptor
logger = logging.getLogger("classification_service.grpc")
class DocumentClassifierServicer(pb2_grpc.DocumentClassifierServicer):
"""
Implements the DocumentClassifier gRPC service.
Inherits from the generated servicer base class.
"""
def __init__(self, model: DocumentClassifierModel):
self._model = model
async def Classify(
self,
request: pb2.ClassifyRequest,
context: grpc.aio.ServicerContext,
) -> pb2.ClassifyResponse:
"""
Unary RPC - one request, one response.
'context' gives access to request metadata, deadline, peer info.
"""
import time
start = time.perf_counter()
# Access request metadata (headers in gRPC)
metadata = dict(context.invocation_metadata())
correlation_id = metadata.get("x-correlation-id", "unknown")
logger.info(
"Classify RPC called",
extra={
"document_id": request.document_id,
"text_length": len(request.text),
"correlation_id": correlation_id,
"peer": context.peer(), # Client address
},
)
# Validate request
if not request.text.strip():
await context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
"text field is required and cannot be empty",
)
return pb2.ClassifyResponse() # Never reached, but satisfies type checker
if len(request.text) > 100_000:
await context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
f"text exceeds maximum length of 100,000 characters",
)
# Check if the client has already cancelled (important for long operations)
if context.cancelled():
logger.info(f"Client cancelled request for doc {request.document_id}")
return pb2.ClassifyResponse()
try:
# Run ML inference - synchronous, so run in thread pool
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None, # Use default thread pool executor
self._model.classify,
request.text,
request.max_results or 3,
)
except Exception as exc:
logger.error(f"Model inference failed: {exc}", exc_info=True)
await context.abort(
grpc.StatusCode.INTERNAL,
"Classification model error",
)
return pb2.ClassifyResponse()
elapsed_ms = int((time.perf_counter() - start) * 1000)
# Build the response protobuf message
response = pb2.ClassifyResponse(
document_id=request.document_id,
model_version=self._model.version,
processing_time_ms=elapsed_ms,
)
for label, confidence, keywords in results:
result = pb2.ClassificationResult(
document_type=label,
confidence=confidence,
)
result.keywords.extend(keywords)
response.results.append(result)
return response
async def serve():
"""Start the gRPC server."""
# Load model once at startup
model = DocumentClassifierModel.load("models/classifier_v3.pt")
# Create server with interceptors
server = aio.server(
interceptors=[
AuthInterceptor(),
LoggingInterceptor(),
],
options=[
# Maximum message size: 50 MB (default is 4 MB)
("grpc.max_receive_message_length", 50 * 1024 * 1024),
("grpc.max_send_message_length", 50 * 1024 * 1024),
# Keep-alive: ping clients every 30 seconds
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 5000),
],
)
pb2_grpc.add_DocumentClassifierServicer_to_server(
DocumentClassifierServicer(model), server
)
# Reflection allows grpcurl and other tools to discover your service
from grpc_reflection.v1alpha import reflection
reflection.enable_server_reflection(
[pb2.DESCRIPTOR.services_by_name["DocumentClassifier"].full_name,
reflection.SERVICE_NAME],
server,
)
listen_addr = "0.0.0.0:50051"
server.add_insecure_port(listen_addr)
logger.info(f"Classification gRPC server starting on {listen_addr}")
await server.start()
try:
await server.wait_for_termination()
except asyncio.CancelledError:
# Graceful shutdown - wait up to 5 seconds for in-flight RPCs
await server.stop(grace=5)
logger.info("gRPC server stopped")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())
The gRPC Client
# upload_service/clients/classifier.py
import grpc
from grpc import aio
import logging
from typing import Optional
import classification_pb2 as pb2
import classification_pb2_grpc as pb2_grpc
from upload_service.exceptions import ClassificationServiceUnavailableError
logger = logging.getLogger("upload_service.classifier_client")
class ClassifierGRPCClient:
"""
Async gRPC client for the Classification Service.
Manages a persistent channel with connection pooling.
"""
def __init__(self, host: str, port: int = 50051):
self._host = host
self._port = port
self._channel: Optional[aio.Channel] = None
self._stub: Optional[pb2_grpc.DocumentClassifierStub] = None
async def connect(self) -> None:
"""Open the gRPC channel. Called at service startup."""
self._channel = aio.insecure_channel(
f"{self._host}:{self._port}",
options=[
("grpc.max_receive_message_length", 50 * 1024 * 1024),
# Reconnect automatically if connection drops
("grpc.enable_retries", 1),
# Service config for automatic retries
("grpc.service_config", """{
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.5s",
"maxBackoff": "5s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"]
}
}"""),
],
)
self._stub = pb2_grpc.DocumentClassifierStub(self._channel)
logger.info(f"Connected to Classification Service at {self._host}:{self._port}")
async def close(self) -> None:
"""Close the channel. Called at service shutdown."""
if self._channel:
await self._channel.close()
async def health_check(self) -> bool:
"""Check if the Classification Service is reachable."""
from grpc_health.v1 import health_pb2, health_pb2_grpc
health_stub = health_pb2_grpc.HealthStub(self._channel)
try:
response = await health_stub.Check(
health_pb2.HealthCheckRequest(service="DocumentClassifier"),
timeout=2.0,
)
return response.status == health_pb2.HealthCheckResponse.SERVING
except grpc.RpcError:
return False
async def classify(
self,
text: str,
document_id: str,
correlation_id: str = "",
timeout_seconds: float = 30.0,
) -> dict:
"""Unary RPC call with timeout and error mapping."""
if not self._stub:
raise RuntimeError("Client not connected. Call connect() first.")
request = pb2.ClassifyRequest(
text=text,
document_id=document_id,
max_results=3,
)
# Send correlation ID as gRPC metadata (equivalent to HTTP headers)
metadata = []
if correlation_id:
metadata.append(("x-correlation-id", correlation_id))
try:
response: pb2.ClassifyResponse = await self._stub.Classify(
request,
timeout=timeout_seconds,
metadata=metadata,
)
return {
"document_id": response.document_id,
"model_version": response.model_version,
"processing_time_ms": response.processing_time_ms,
"results": [
{
"label": pb2.DocumentType.Name(r.document_type),
"confidence": r.confidence,
"keywords": list(r.keywords),
}
for r in response.results
],
}
except grpc.RpcError as exc:
self._handle_grpc_error(exc)
def _handle_grpc_error(self, exc: grpc.RpcError) -> None:
"""Map gRPC errors to domain exceptions."""
code = exc.code()
details = exc.details()
logger.error(
f"gRPC error: {code.name} - {details}",
extra={"grpc_code": code.name, "grpc_details": details},
)
if code == grpc.StatusCode.UNAVAILABLE:
raise ClassificationServiceUnavailableError(
f"Classification Service is unreachable: {details}"
)
elif code == grpc.StatusCode.DEADLINE_EXCEEDED:
raise ClassificationServiceUnavailableError(
"Classification Service timed out"
)
elif code == grpc.StatusCode.INVALID_ARGUMENT:
raise ValueError(f"Invalid classification request: {details}")
elif code == grpc.StatusCode.INTERNAL:
raise ClassificationServiceUnavailableError(
f"Classification Service internal error: {details}"
)
else:
raise ClassificationServiceUnavailableError(
f"Unexpected gRPC error {code.name}: {details}"
)
Part 4: Server-Side Streaming
Server-side streaming is ideal for returning large results progressively.
# classification_service/server.py - add to DocumentClassifierServicer
async def ClassifyBatch(
self,
request: pb2.ClassifyBatchRequest,
context: grpc.aio.ServicerContext,
) -> None:
"""
Server-side streaming RPC.
Client sends ONE request (a batch).
Server streams MANY responses (one per document).
The method signature returns None - you write to the context instead.
"""
logger.info(f"ClassifyBatch called with {len(request.requests)} documents")
for i, classify_request in enumerate(request.requests):
# Check if client cancelled between items
if context.cancelled():
logger.info("Client cancelled batch classification")
return
try:
results = await asyncio.get_event_loop().run_in_executor(
None, self._model.classify, classify_request.text, 3
)
response = pb2.ClassifyResponse(
document_id=classify_request.document_id,
model_version=self._model.version,
)
for label, confidence, keywords in results:
result = pb2.ClassificationResult(
document_type=label, confidence=confidence
)
result.keywords.extend(keywords)
response.results.append(result)
# Stream this result back immediately - don't wait for all
await context.write(
pb2.ClassifyBatchResponse(
result=response,
sequence_number=i,
is_last=(i == len(request.requests) - 1),
)
)
except Exception as exc:
logger.error(f"Failed to classify document {classify_request.document_id}: {exc}")
# Continue with other documents rather than failing the whole batch
# Client consuming the stream
async def classify_batch(self, texts_and_ids: list[tuple[str, str]]) -> list[dict]:
"""Client-side iteration over a server-streaming RPC."""
batch_request = pb2.ClassifyBatchRequest(
requests=[
pb2.ClassifyRequest(text=text, document_id=doc_id)
for text, doc_id in texts_and_ids
],
include_keywords=True,
)
results = []
# aio stub returns an async generator - iterate with async for
async for batch_response in self._stub.ClassifyBatch(batch_request, timeout=120.0):
results.append({
"sequence": batch_response.sequence_number,
"document_id": batch_response.result.document_id,
"label": batch_response.result.results[0].document_type if batch_response.result.results else None,
})
if batch_response.is_last:
break
return results
Part 5: Client-Side Streaming
Client-side streaming is ideal for large file uploads broken into chunks.
# classification_service/server.py - add to DocumentClassifierServicer
async def UploadDocument(
self,
request_iterator: grpc.aio.ServicerContext,
context: grpc.aio.ServicerContext,
) -> pb2.UploadResponse:
"""
Client-side streaming RPC.
Client streams MANY requests (file chunks).
Server returns ONE response (upload complete).
"""
chunks = []
document_id = None
mime_type = None
total_bytes = 0
# Iterate over the client's stream of chunks
async for chunk in context: # context is iterable for client-streaming
if document_id is None:
document_id = chunk.document_id
mime_type = chunk.mime_type
chunks.append(chunk.data)
total_bytes += len(chunk.data)
logger.debug(
f"Received chunk {chunk.chunk_number} for doc {chunk.document_id} "
f"({len(chunk.data)} bytes)"
)
if total_bytes > 50 * 1024 * 1024: # 50 MB limit
await context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
"Document exceeds maximum size of 50 MB",
)
return pb2.UploadResponse()
# Reassemble the file
file_bytes = b"".join(chunks)
# Store it
storage_key = await self._storage.put(
f"raw/{document_id}", file_bytes, content_type=mime_type
)
return pb2.UploadResponse(
document_id=document_id,
total_bytes=total_bytes,
total_chunks=len(chunks),
storage_key=storage_key,
)
# Upload client - streaming file chunks
async def upload_document(self, file_bytes: bytes, document_id: str, mime_type: str) -> dict:
"""Stream a file to the server in chunks."""
CHUNK_SIZE = 256 * 1024 # 256 KB chunks
async def chunk_generator():
total_chunks = (len(file_bytes) + CHUNK_SIZE - 1) // CHUNK_SIZE
for i, offset in enumerate(range(0, len(file_bytes), CHUNK_SIZE)):
chunk_data = file_bytes[offset:offset + CHUNK_SIZE]
yield pb2.DocumentChunk(
data=chunk_data,
document_id=document_id,
chunk_number=i,
is_last=(i == total_chunks - 1),
mime_type=mime_type if i == 0 else "", # Only send mime_type once
)
response = await self._stub.UploadDocument(chunk_generator(), timeout=60.0)
return {
"document_id": response.document_id,
"total_bytes": response.total_bytes,
"total_chunks": response.total_chunks,
"storage_key": response.storage_key,
}
Part 6: Bidirectional Streaming
Bidirectional streaming enables real-time, interactive protocols.
# Real-time transcription: client streams audio chunks, server streams text results
# classification_service/server.py
async def ClassifyStream(
self,
request_iterator,
context: grpc.aio.ServicerContext,
):
"""
Bidirectional streaming RPC.
Both client and server stream simultaneously.
Use for: real-time audio transcription, interactive NLP, game state sync.
"""
correlation_id = None
async for event in request_iterator:
if context.cancelled():
return
# The event has a oneof field - handle each case
event_type = event.WhichOneof("event_type")
correlation_id = event.correlation_id
if event_type == "classification_started":
classify_request = event.classification_started
logger.info(
f"Stream: classifying doc {classify_request.document_id}",
extra={"correlation_id": correlation_id},
)
try:
results = await asyncio.get_event_loop().run_in_executor(
None, self._model.classify, classify_request.text, 3
)
response_msg = pb2.ClassifyResponse(
document_id=classify_request.document_id,
model_version=self._model.version,
)
for label, confidence, _ in results:
response_msg.results.append(
pb2.ClassificationResult(
document_type=label, confidence=confidence
)
)
# Stream result back immediately
await context.write(
pb2.ClassificationEvent(
correlation_id=correlation_id,
classification_complete=response_msg,
)
)
except Exception as exc:
await context.write(
pb2.ClassificationEvent(
correlation_id=correlation_id,
classification_failed=pb2.ErrorEvent(
code="INFERENCE_ERROR",
message=str(exc),
),
)
)
# Client: send and receive simultaneously
async def classify_stream(self, documents: list[dict]) -> list[dict]:
results = []
call = self._stub.ClassifyStream()
async def send_requests():
for doc in documents:
await call.write(pb2.ClassificationEvent(
correlation_id=doc["correlation_id"],
classification_started=pb2.ClassifyRequest(
text=doc["text"],
document_id=doc["id"],
),
))
await call.done_writing()
# Send and receive concurrently
send_task = asyncio.create_task(send_requests())
async for response_event in call:
event_type = response_event.WhichOneof("event_type")
if event_type == "classification_complete":
results.append({
"id": response_event.classification_complete.document_id,
"label": response_event.classification_complete.results[0].document_type
if response_event.classification_complete.results else None,
})
elif event_type == "classification_failed":
results.append({
"id": None,
"error": response_event.classification_failed.message,
})
await send_task
return results
Part 7: Interceptors
Interceptors are gRPC's middleware - they run around every RPC call.
Server-Side Logging Interceptor
# classification_service/interceptors.py
import grpc
from grpc import aio
import logging
import time
logger = logging.getLogger("classification_service.interceptors")
class LoggingInterceptor(aio.ServerInterceptor):
"""Logs all incoming RPCs with timing and metadata."""
async def intercept_service(
self,
continuation,
handler_call_details: grpc.HandlerCallDetails,
):
method = handler_call_details.method
start = time.perf_counter()
handler = await continuation(handler_call_details)
async def intercepted_handler(request_or_iterator, servicer_context):
metadata = dict(servicer_context.invocation_metadata())
correlation_id = metadata.get("x-correlation-id", "none")
logger.info(
f"RPC started: {method}",
extra={"method": method, "correlation_id": correlation_id},
)
try:
response = await handler(request_or_iterator, servicer_context)
elapsed = (time.perf_counter() - start) * 1000
logger.info(
f"RPC complete: {method} ({elapsed:.1f}ms)",
extra={"method": method, "duration_ms": elapsed, "correlation_id": correlation_id},
)
return response
except Exception as exc:
elapsed = (time.perf_counter() - start) * 1000
logger.error(
f"RPC failed: {method} ({elapsed:.1f}ms): {exc}",
extra={"method": method, "duration_ms": elapsed, "error": str(exc)},
)
raise
return grpc.unary_unary_rpc_method_handler(intercepted_handler)
Server-Side Auth Interceptor
class AuthInterceptor(aio.ServerInterceptor):
"""Validates JWT tokens in gRPC metadata."""
UNAUTHENTICATED_METHODS = {"/grpc.health.v1.Health/Check"}
async def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method
if method in self.UNAUTHENTICATED_METHODS:
return await continuation(handler_call_details)
handler = await continuation(handler_call_details)
async def auth_handler(request_or_iterator, servicer_context):
metadata = dict(servicer_context.invocation_metadata())
token = metadata.get("authorization", "")
if not token.startswith("Bearer "):
await servicer_context.abort(
grpc.StatusCode.UNAUTHENTICATED,
"Missing or invalid Authorization metadata",
)
return None
jwt_token = token[len("Bearer "):]
try:
user = await verify_jwt(jwt_token)
# Make user available to the handler
servicer_context.user = user
except InvalidTokenError:
await servicer_context.abort(
grpc.StatusCode.UNAUTHENTICATED,
"Invalid or expired token",
)
return None
return await handler(request_or_iterator, servicer_context)
return grpc.unary_unary_rpc_method_handler(auth_handler)
Client-Side Retry Interceptor
# upload_service/clients/interceptors.py
class RetryInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
"""Client-side retry for transient failures."""
RETRIABLE_CODES = {grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED}
def __init__(self, max_retries: int = 3, initial_backoff: float = 0.5):
self._max_retries = max_retries
self._initial_backoff = initial_backoff
async def intercept_unary_unary(self, continuation, client_call_details, request):
last_error = None
backoff = self._initial_backoff
for attempt in range(self._max_retries + 1):
try:
return await continuation(client_call_details, request)
except grpc.RpcError as exc:
if exc.code() not in self.RETRIABLE_CODES or attempt == self._max_retries:
raise
last_error = exc
import asyncio, random
jitter = random.uniform(0, backoff * 0.1)
logger.warning(
f"Retrying gRPC call (attempt {attempt + 1}/{self._max_retries}), "
f"sleeping {backoff + jitter:.2f}s"
)
await asyncio.sleep(backoff + jitter)
backoff = min(backoff * 2, 30.0)
raise last_error
Part 8: Error Handling
gRPC Status Codes
| StatusCode | HTTP Equivalent | When to Use |
|---|---|---|
OK | 200 | Success |
CANCELLED | 499 | Client cancelled the request |
UNKNOWN | 500 | Unknown error (fallback) |
INVALID_ARGUMENT | 400 | Bad request data |
DEADLINE_EXCEEDED | 504 | Operation timed out |
NOT_FOUND | 404 | Resource not found |
ALREADY_EXISTS | 409 | Duplicate resource |
PERMISSION_DENIED | 403 | Authenticated but not authorised |
UNAUTHENTICATED | 401 | Not authenticated |
RESOURCE_EXHAUSTED | 429 | Rate limit, quota exceeded |
FAILED_PRECONDITION | 422 | System not in required state |
INTERNAL | 500 | Internal server error |
UNAVAILABLE | 503 | Service temporarily unavailable |
DATA_LOSS | 500 | Unrecoverable data loss |
Structured Error Details
# For rich error information, use google.rpc.Status with details
from google.rpc import status_pb2, error_details_pb2
from grpc_status import rpc_status
async def Classify(self, request, context):
if not request.text:
# Rich error: BadRequest with field violations
detail = error_details_pb2.BadRequest()
field_violation = detail.field_violations.add()
field_violation.field = "text"
field_violation.description = "text is required and cannot be empty"
await context.abort_with_status(
rpc_status.to_status(
status_pb2.Status(
code=grpc.StatusCode.INVALID_ARGUMENT.value[0],
message="Validation failed",
details=[detail.Pack(field_violation)], # type: Any
)
)
)
# Client extracting the rich error:
try:
response = await stub.Classify(request)
except grpc.RpcError as exc:
status = rpc_status.from_call(exc)
if status:
for detail in status.details:
if detail.Is(error_details_pb2.BadRequest.DESCRIPTOR):
bad_request = error_details_pb2.BadRequest()
detail.Unpack(bad_request)
for violation in bad_request.field_violations:
print(f"Field '{violation.field}': {violation.description}")
Mapping gRPC Errors to HTTP in Gateway Services
# When your FastAPI gateway calls a gRPC service and needs to map errors to HTTP:
GRPC_TO_HTTP_STATUS = {
grpc.StatusCode.OK: 200,
grpc.StatusCode.INVALID_ARGUMENT: 400,
grpc.StatusCode.NOT_FOUND: 404,
grpc.StatusCode.ALREADY_EXISTS: 409,
grpc.StatusCode.PERMISSION_DENIED: 403,
grpc.StatusCode.UNAUTHENTICATED: 401,
grpc.StatusCode.RESOURCE_EXHAUSTED: 429,
grpc.StatusCode.FAILED_PRECONDITION: 422,
grpc.StatusCode.INTERNAL: 500,
grpc.StatusCode.UNAVAILABLE: 503,
grpc.StatusCode.DEADLINE_EXCEEDED: 504,
grpc.StatusCode.UNKNOWN: 500,
}
# FastAPI exception handler for gRPC errors
async def grpc_error_handler(request: Request, exc: grpc.RpcError) -> JSONResponse:
http_status = GRPC_TO_HTTP_STATUS.get(exc.code(), 500)
return JSONResponse(
status_code=http_status,
content={
"error_code": exc.code().name,
"message": exc.details(),
"request_id": getattr(request.state, "request_id", "unknown"),
},
)
# Register in FastAPI app
app.add_exception_handler(grpc.RpcError, grpc_error_handler)
Part 9: gRPC vs REST Decision Guide
Use this matrix for every new service communication interface:
| Question | If YES → | If NO → |
|---|---|---|
| Will browser clients call this directly? | REST | Either |
| Is this called > 5,000 times/minute? | gRPC | REST is fine |
| Does it need streaming (server-push, live updates)? | gRPC | Either |
| Are multiple teams in different languages consuming it? | gRPC (schema enforced) | REST |
| Is it a public-facing API (external developers)? | REST | Either |
| Does the payload contain binary data (files, embeddings)? | gRPC | Either |
| Do you need bidirectional real-time communication? | gRPC | Either |
| Is operational simplicity the top priority? | REST | Either |
| Is latency < 5 ms critical? | gRPC | Either |
The practical rule:
- Public APIs, browser calls, external developers → REST
- Internal high-frequency service calls, ML serving, binary payloads, streaming → gRPC
- Everything else → REST (simpler, more tooling, easier to debug)
Interview Patterns
Q: What is the difference between the four gRPC streaming patterns?
A: Unary (one request, one response) works like a normal function call. Server-streaming (one request, many responses) is like a server-push feed - used for batch results, live notifications. Client-streaming (many requests, one response) is like chunked upload - client sends data, server returns one confirmation. Bidirectional streaming (both sides stream simultaneously) is like a real-time duplex channel - used for interactive protocols like transcription, game state sync.
Q: Why does Protocol Buffers serialisation produce smaller payloads than JSON?
A: Three reasons. First, field names are not repeated - a field document_type in JSON takes 15 bytes as a string key in every message; in Protobuf it is encoded as a varint field number (typically 1 byte). Second, Protobuf uses efficient binary encoding for numbers (variable-length integers), while JSON encodes numbers as ASCII strings. Third, Protobuf has no whitespace. A typical message that is 500 bytes in JSON is 50–150 bytes in Protobuf.
Q: How do gRPC interceptors differ from FastAPI middleware?
A: Both are middleware, but scoped differently. FastAPI middleware operates at the HTTP request/response level - it sees raw HTTP headers, paths, status codes. gRPC interceptors operate at the RPC level - they see method names, protobuf messages, gRPC metadata, and StatusCode. You cannot write a gRPC interceptor in FastAPI middleware or vice versa. For a service that exposes both REST and gRPC, you need both.
Q: How do you handle gRPC error propagation across service boundaries?
A: Map gRPC StatusCode to HTTP status codes in gateway services using a lookup table (UNAVAILABLE → 503, INVALID_ARGUMENT → 400, etc.). For richer error details, use google.rpc.Status with details field containing packed BadRequest, QuotaFailure, or ErrorInfo messages. Always propagate correlation IDs in gRPC metadata so errors can be traced across services in Jaeger.
